Skip to content

Conversation

@zikangh
Copy link
Contributor

@zikangh zikangh commented Oct 7, 2025

Which Delta project/connector is this regarding?

  • Spark
  • Standalone
  • Flink
  • Kernel
  • Other (fill in here)

Description

This PR is Part I of implementing SparkMicroBatchStream.getFileChanges() to support Kernel-based dsv2 Delta streaming (M1 milestone).

  • Reads Delta commit range and converts actions to KernelIndexedFile objects with proper indexing and sentinel values.
  • Basic 1-pass commit validation.

Followups include schema evolution support and initial snapshot support (marked TODO(M1) in code)

How was this patch tested?

Parameterized tests verifying parity between DSv1 (DeltaSource) and DSv2 (SparkMicroBatchStream).

Does this PR introduce any user-facing changes?

No

@zikangh zikangh mentioned this pull request Oct 7, 2025
5 tasks
@zikangh zikangh changed the title [kernel dsv2 streaming] Add logic that reads the delta commit log in preparation for streaming read (Part I) [kernel dsv2 streaming] Add logic that reads the delta commit log to determine offsets for streaming read (Part I) Oct 7, 2025
@zikangh zikangh changed the title [kernel dsv2 streaming] Add logic that reads the delta commit log to determine offsets for streaming read (Part I) [kernel-spark] Add getFileChanges() to support Kernel-based DSv2 streaming (Part I) Oct 7, 2025
@zikangh
Copy link
Contributor Author

zikangh commented Oct 8, 2025

Hi @huan233usc @gengliangwang @jerrypeng, could you please help review this PR?

*
* <p>Indexed: refers to the index in DeltaSourceOffset, assigned by the streaming engine.
*/
public class KernelIndexedFile {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: just call it IndexedFile? Kernel is just an impl details

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

* from the first row (rowId=0).
*/
public static long getVersion(ColumnarBatch batch) {
assert batch.getSize() > 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's follow https://github.com/delta-io/delta/blob/master/kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/RowBackedAction.java#L46 and create a new helper function here

protected int getFieldIndex(String fieldName) {
    int index = row.getSchema().indexOf(fieldName);
    checkArgument(index >= 0, "Field '%s' not found in schema: %s", fieldName, row.getSchema());
    return index;
  }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Thank you!

int id = i * 100 + j;
insertValues.append(String.format("(%d, 'User%d')", id, id));
}
spark.sql(String.format("INSERT INTO %s VALUES %s", testTableName, insertValues.toString()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it might be overkill at this point, especially when there are so many unsupported table types. I agree we should eventually add a test like this. Added a TODO.

continue;
}
long version = StreamingHelper.getVersion(batch);
validateCommit(batch, version, endOffset);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should validate happen after processing previous version?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, thanks!

}
CommitRange commitRange = builder.build(engine);
// Required by kernel: perform protocol validation by creating a snapshot at startVersion.
Snapshot startSnapshot =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you need to get a snapshot even if we start reading from a specific delta log version?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's required by the kernel to fetch actions:

* @param startSnapshot the snapshot for startVersion, required to ensure the table is readable by

Snapshot startSnapshot =
TableManager.loadSnapshot(tablePath).atVersion(startVersion).build(engine);
// TODO(M1): This is not working with ccv2 table
Set<DeltaAction> actionSet = new HashSet<>(Arrays.asList(DeltaAction.ADD, DeltaAction.REMOVE));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally this is class static variable so it will only be allocated once per query run.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we also need to get the "REMOVE" actions?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

See validateCommit() -- the current behavior of the delta connector is that we fail the pipeline if any commit contains a REMOVE (unless skipDeletes or skipChangeCommits are specified). Streaming jobs are meant to process append-only data

}
long version = StreamingHelper.getVersion(batch);
// TODO(M1): migrate to kernel's commit-level iterator (WIP).
// The current one-pass algorithm assumes REMOVE actions proceed ADD actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where are you filtering out the "REMOVE" actions?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We throw an error whenever we encounter a REMOVE -- because ETL jobs should process append-only data. We fail explicitly to avoid correctness issues.
In M2, we'll also support ignoreChangedCommits and ignoreDeletes to skip these commits silently.
To properly handle update actions, users would need to use CDF.

@huan233usc huan233usc requested a review from tdas October 9, 2025 21:02
}

Row addFileRow = StructRow.fromStructVector(addVector, rowId);
if (addFileRow == null) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we throw here? given -- addVector.isNullAt(rowId) is false

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We call this method even on REMOVE rows in extractIndexedFilesFromBatch

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if this is a remove row, iiuc the method will return on L175? Did I miss something?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yes. You are right. Done.

// A version can be split across multiple batches.
long currentVersion = -1;
long currentIndex = 0;
List<IndexedFile> currentVersionFiles = new ArrayList<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its more performant to use a linked list if we don't actually know the size the list will be

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so. The per-node overhead of a linked list outweighs the cost of resizing of an arraylist, especially for my use case (addAll(), add(), clear()). We would maybe get a performance benefit if we do a lot of deletes and inserts at the beginning or in the middle (which we are not).


// TODO(#5319): check trackingMetadataChange flag and compare with stream metadata.

result.addAll(dataFiles);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not very efficient. "dataFiles" should just be a linked list and you can append and prepend in constant time.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto -- I don't think linked lists would help here.

// The current one-pass algorithm assumes REMOVE actions proceed ADD actions
// in a commit; we should implement a proper two-pass approach once kernel API is ready.

if (currentVersion != -1 && version != currentVersion) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic here is kind of confusing. All you are trying to do is sandwich the index files between the BASE_INDEX sentinel file and END_INDEX sentinel file right? Why not simplify the logic to be

allIndexedFiles.add(beginSentinelFile)
allIndexedFiles.addAll(allIndexFilesInBatch)
allIndexedFiles.add(endSentinelFile)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We only insert sentinels before and after a version. The code is complex because the kernel breaks up a commit into batches (ColumnarBatch) to avoid overwhelming memory. I reorganized the code a bit to make this clear. Could you take another look?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Chatted offline with @zikangh. I had a concern on why we needed to use another list i.e. "currentVersionFiles" to buffer files for this version and not just directly append to allIndexedFiles. The reason is that in the next PR, she is going to introduce the ability to skip whole commits.

*/
private List<IndexedFile> extractIndexedFilesFromBatch(
ColumnarBatch batch, long version, long startIndex) {
List<IndexedFile> indexedFiles = new ArrayList<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use linkedlist

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same rationale as above -- we are only doing addAll() and add(), arrayList would be faster and more memory-efficient.

Arguments.of(
0L, BASE_INDEX, isInitialSnapshot, Optional.of(2L), Optional.of(5L), "v0 to v2 id:5"),
Arguments.of(
1L, 5L, isInitialSnapshot, Optional.of(3L), Optional.of(10L), "v1 id:5 to v3 id:10"),
Copy link
Contributor

@jerrypeng jerrypeng Oct 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about to and from END_INDEX?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Thanks!

*/
@ParameterizedTest
@MethodSource("getFileChangesParameters")
public void testGetFileChanges(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also test with other types of actions in the delta log?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do test REMOVEs & ADDs, I added METADATA too (which now will yield empty commits).

"Index mismatch at index %d: dsv1=%d, dsv2=%d",
i, deltaFile.index(), kernelFile.getIndex()));

String deltaPath = deltaFile.add() != null ? deltaFile.add().path() : null;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: document that deltaFile.add() != null could happen when it is starting/ending index

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Copy link
Collaborator

@huan233usc huan233usc left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is good as a starting point.

@zikangh zikangh requested a review from jerrypeng October 13, 2025 18:40
*
* <p>Indexed: refers to the index in DeltaSourceOffset, assigned by the streaming engine.
*/
public class IndexedFile {
Copy link
Contributor

@tdas tdas Oct 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this duplicating any code that already present in v1 delta connector?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes we are duplicating the scala version of IndexedFile, because the logic is simple enough and pulls in DeltaLog-only dependencies AddFile and RemoveFile. If I create a common interface for AddFile and RemoveFile to bridge kernel and DeltaLog, the code would be harder-to-maintain and more error-prone.

However there are cases where refactoring is a clear win, e.g. DeltaSourceAdmissionBase and AdmissionLimits -- the logic is complex and a small refactor would make them work for both Kernel and DeltaLog classes.

w.r.t duplicating vs sharing code, I'm weighing the pros and cons on a case by case basis following the principles I outlined above.

Copy link
Contributor

@tdas tdas left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

very high level approval. @jerrypeng you are much closer to the streaming than i have been for years, please take a detailed look 🙏

my general high level question, not a concern, is how much of this new code is duplicating existing v1 connector code? do we need to duplicate or can we refactor the existing code just enough to referenced and reusable in the v2 connector?

@zikangh
Copy link
Contributor Author

zikangh commented Oct 15, 2025

my general high level question, not a concern, is how much of this new code is duplicating existing v1 connector code? do we need to duplicate or can we refactor the existing code just enough to referenced and reusable in the v2 connector?

Thanks @tdas
IndexedFile and these boundary checks are duplicated in this PR.

@jerrypeng
Copy link
Contributor

my general high level question, not a concern, is how much of this new code is duplicating existing v1 connector code? do we need to duplicate or can we refactor the existing code just enough to referenced and reusable in the v2 connector?

@tdas we would like to reuse existing v1 code as much as possible to reduce the risk bugs and ease of code reviews. However, the due to the difference in the API between the Delta Kernel and the DeltaLog API (used in DSv1), some of the code may have to be duplicated and rewritten unfortunately.

Yes, this makes detailed reviews and good tests very important.

@huan233usc huan233usc merged commit 68cf284 into delta-io:master Oct 16, 2025
27 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants